/*******************************************************************************
 * Package: com.song.bigdata.stream
 * Type:    MyAggregation
 * Date:    2022-10-28 17:32
 *
 * 
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.bigdata.stream;

import com.song.bigdata.pojo.User;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 功能描述：聚合算子
 *
 * sum()：在输入流上，对指定的字段做叠加求和的操作。
 * ⚫ min()：在输入流上，对指定的字段求最小值。
 * ⚫ max()：在输入流上，对指定的字段求最大值。
 * User{name='song-user', age=12}
 * User{name='xian-user', age=92}
 * User{name='song-user', age=12}
 * User{name='song-user', age=110}
 * User{name='song-user', age=110}
 * User{name='xian-user', age=92}
 * ⚫ minBy()：与 min()类似，在输入流上针对指定字段求最小值。不同的是，min()只计
 * 算指定字段的最小值，其他字段会保留最初第一个数据的值；而 minBy()则会返回包
 * 含字段最小值的整条数据。
 * ⚫ maxBy()：与 max()类似，在输入流上针对指定字段求最大值。两者区别与(直接获取当前数据作为最大值的输出）
 * min()/minBy()完全一致。
 * @author Songxianyang
 * @date 2022-10-28 17:32
 */
public class MyAggregation {
    public static void main(String[] args) throws Exception{
        // 创建环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 并行度
        environment.setParallelism(1);
        DataStreamSource<User> streamSource = environment.fromElements(
                new User("song-user", 12),
                new User("xian-user", 92),
                new User("song-user", 11),
                new User("song-user", 110),
                new User("song-user", 90),
                new User("xian-user", 10)
                );

        // 找到最活跃的用户，并统计出现的次数
        DataStreamSink<Tuple2<String, Long>> print = streamSource.map(new MapFunction<User, Tuple2<String, Long>>() {
            private static final long serialVersionUID = 5463963779123778573L;

            @Override
            public Tuple2<String, Long> map(User user) throws Exception {
                return Tuple2.of(user.name, 1L);
            }
        }).keyBy(data -> data.f0).reduce(new ReduceFunction<Tuple2<String, Long>>() {
            private static final long serialVersionUID = -3582495696800374320L;

            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Long> t1) throws Exception {
                return Tuple2.of(stringLongTuple2.f0, stringLongTuple2.f1 + t1.f1);
            }
        }).keyBy(r->true).reduce(new ReduceFunction<Tuple2<String, Long>>() {
            private static final long serialVersionUID = -815926475702841764L;

            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Long> t1) throws Exception {
                // 优化 stringLongTuple2.f1 > t1.f1 最后一次把最活跃的用户打印出来
                return stringLongTuple2.f1 > t1.f1 ? stringLongTuple2 : t1;
            }
        }).print("haha");
        // 普通的聚合转换
        // SingleOutputStreamOperator<User> age = streamSource.keyBy(user -> user.name).max("age");
        // age.print();
        environment.execute();
    }
}
